package datahub

import (
	"sync"
)

type schemaRegistryClient struct {
	dh         DataHubApi
	schemaMap  sync.Map
	versionMap sync.Map
}

func NewSchemaClient(dh DataHubApi) *schemaRegistryClient {
	return &schemaRegistryClient{
		dh: dh,
	}
}

func (client *schemaRegistryClient) getSchemaByVersion(project, topic string, version int) (*RecordSchema, error) {
	key := client.genKey(project, topic)

	value, ok := client.versionMap.Load(key)
	if !ok {
		m := sync.Map{}
		client.versionMap.Store(key, m)
		value, _ = client.versionMap.Load(key)
	}

	topicCache := value.(sync.Map)
	schemaVal, ok := topicCache.Load(version)
	if !ok {
		newSchema, err := client.fetchSchemaById(project, topic, version)
		if err != nil {
			return nil, err
		}
		schemaVal = *newSchema
		topicCache.Store(version, schemaVal)
		client.versionMap.Store(key, topicCache)
	}
	schema := schemaVal.(RecordSchema)

	return &schema, nil
}

func (client *schemaRegistryClient) getVersionBySchema(project, topic string, schema *RecordSchema) (int, error) {
	key := client.genKey(project, topic)

	value, ok := client.schemaMap.Load(key)
	if !ok {
		m := sync.Map{}
		client.schemaMap.Store(key, m)
		value, _ = client.schemaMap.Load(key)
	}

	topicCache := value.(sync.Map)
	schemaKey := schema.String()
	versionVal, ok := topicCache.Load(schemaKey)
	if !ok {
		newVersion, err := client.fetchVersionBySchema(project, topic, schema)
		if err != nil {
			return -1, err
		}
		versionVal = newVersion
		topicCache.Store(schemaKey, versionVal)
		client.schemaMap.Store(key, topicCache)
	}

	version := versionVal.(int)
	return version, nil
}

func (client *schemaRegistryClient) fetchSchemaById(project, topic string, version int) (*RecordSchema, error) {
	ret, err := client.dh.GetTopicSchemaByVersion(project, topic, version)
	if err != nil {
		return nil, err
	}

	return &ret.RecordSchema, nil
}

func (client *schemaRegistryClient) fetchVersionBySchema(project, topic string, schema *RecordSchema) (int, error) {
	ret, err := client.dh.GetTopicSchemaBySchema(project, topic, schema)

	if err != nil {
		return -1, err
	}

	return ret.VersionId, nil
}

func (client *schemaRegistryClient) genKey(project, topic string) string {
	return project + "/" + topic
}
